package co.recloud.ariadne.model.plan;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import co.recloud.ariadne.model.logical.Column;
import co.recloud.ariadne.model.logical.Condition;
import co.recloud.ariadne.model.logical.Table;
import co.recloud.ariadne.model.logical.Transaction;
import co.recloud.ariadne.persistor.KeyKeyValuePersistor;
import co.recloud.ariadne.persistor.KeyKeyValuePersistorImpl;
import co.recloud.ariadne.persistor.KeyValuePersistor;
import co.recloud.ariadne.persistor.KeyValuePersistorImpl;

public class HashJoin extends Operator {
	/**
	 * 
	 */
	private static final long serialVersionUID = 1737425940512906070L;

	public HashJoin(Transaction transaction) {
		super(transaction);
		// TODO Auto-generated constructor stub
	}

	private Condition condition;
	private Column clusterColumn;
	private Table startTable;

	/**
	 * @return the leftKey
	 */
	public Column getLeftKey() {
		return this.condition.getLeftColumn();
	}

	/**
	 * @param leftKey
	 *            the leftKey to set
	 */
	public void setLeftKey(Column leftKey) {
		this.condition.setLeftColumn(leftKey);
	}

	/**
	 * @return the rightKey
	 */
	public Column getRightKey() {
		return this.condition.getRightColumn();
	}

	/**
	 * @param rightKey
	 *            the rightKey to set
	 */
	public void setRightKey(Column rightKey) {
		this.condition.setRightColumn(rightKey);
	}

	/**
	 * @return the leftChild
	 */
	public Operator getLeftChild() {
		if (this.getChildren() == null || this.getChildren().size() < 1) {
			return null;
		} else {
			return this.getChildren().get(0);
		}
	}

	/**
	 * @param leftChild
	 *            the leftChild to set
	 */
	public void setLeftChild(Operator leftChild) {
		if (this.getChildren() == null) {
			this.setChildren(new ArrayList<Operator>());
		}
		if (this.getChildren().size() < 1) {
			this.getChildren().add(leftChild);
		} else {
			this.getChildren().set(0, leftChild);
		}
	}

	/**
	 * @return the rightChild
	 */
	public Operator getRightChild() {
		if (this.getChildren() == null || this.getChildren().size() < 2) {
			return null;
		} else {
			return this.getChildren().get(1);
		}
	}

	/**
	 * @param rightChild
	 *            the rightChild to set
	 */
	public void setRightChild(Operator rightChild) {
		if (this.getChildren() == null) {
			this.setChildren(new ArrayList<Operator>());
		}
		if (this.getChildren().size() < 1) {
			this.getChildren().add(null);
		}
		if (this.getChildren().size() < 2) {
			this.getChildren().add(rightChild);
		} else {
			this.getChildren().set(1, rightChild);
		}
	}

	/**
	 * @param condition
	 *            the condition to set
	 */
	public void setCondition(Condition condition) {
		this.condition = condition;
	}

	/**
	 * @return the condition
	 */
	public Condition getCondition() {
		return condition;
	}

	public String toString() {
		return "HashJoin: " + this.getLeftChild() + " to "
				+ this.getRightChild() + " on " + this.getCondition();
	}

	public void execute(String localhost, int port) {
		this.getLeftChild().execute(localhost, port);
		this.getRightChild().execute(localhost, port);
		long resultIndex = 1;
		KeyValuePersistor kv = new KeyValuePersistorImpl(this.getTransaction());
		Table leftTable = this.getLeftChild().getTable();
		Table rightTable = this.getRightChild().getTable();
		Table childTable = null;
		Table parentTable = null;
		Table resultTable = this.getTable();
		Column leftConditionColumn = null;
		Column rightConditionColumn = null;
		if (getLeftChild().getBaseTables().contains(
				condition.getLeftColumn().getTable())) {
			leftConditionColumn = condition.getLeftColumn();
			rightConditionColumn = condition.getRightColumn();
		} else {
			leftConditionColumn = condition.getRightColumn();
			rightConditionColumn = condition.getLeftColumn();
		}
		Column childClusterColumn = null;
		Column parentClusterColumn = null;
		if (leftConditionColumn.getField().equals(Table.PRIMARY_KEY)) {
			childTable = rightTable;
			parentTable = leftTable;
			parentClusterColumn = leftConditionColumn;
			childClusterColumn = rightConditionColumn;
		} else {
			childTable = leftTable;
			parentTable = rightTable;
			parentClusterColumn = rightConditionColumn;
			childClusterColumn = leftConditionColumn;
		}
		Set<String> parentKeys = new HashSet<String>();
		Map<String, Set<String>> childClusters = new HashMap<String, Set<String>>();
		if(childTable.isResultSet()) {
			Set<String> resultChildKeys = kv.getAllKeys(childTable);
			Column childResultClusterColumn = new Column(childTable,
					childClusterColumn.getTable().getAlias() + "."
							+ childClusterColumn.getField());
			for (String resultChildKey : resultChildKeys) {
				String parentKey = kv.getValue(resultChildKey,
						childResultClusterColumn);
				if (!childClusters.containsKey(parentKey)) {
					childClusters.put(parentKey, new HashSet<String>());
				}
				childClusters.get(parentKey).add(resultChildKey);
				if (parentKey != null) {
					parentKeys.add(parentKey);
				}
			}
		}
		Map<String, Set<String>> parentClusters = new HashMap<String, Set<String>>();
		if(parentTable.isResultSet()) {
			Set<String> resultParentKeys = kv.getAllKeys(parentTable);
			if(!parentTable.isResultSet() && !parentKeys.isEmpty()) {
				resultParentKeys.addAll(parentKeys);
			}
			for (String resultParentKey : resultParentKeys) {
				Column parentResultClusterColumn = new Column(
						parentTable, parentClusterColumn.getTable().getAlias()
						+ "." + parentClusterColumn.getField());
				String parentKey = kv.getValue(resultParentKey, parentResultClusterColumn);
				if (!parentClusters.containsKey(parentKey)) {
					parentClusters.put(parentKey, new HashSet<String>());
				}
				parentClusters.get(parentKey).add(resultParentKey);
				if (parentKey != null) {
					parentKeys.add(parentKey);
				}
	
			}
		}
		if (childTable.isResultSet() && !parentTable.isResultSet()) {
			for(String parentKey : parentKeys) {
				parentClusters.put(parentKey, new HashSet<String>());
				parentClusters.get(parentKey).add(parentKey);
			}
		}
		if(!childTable.isResultSet() && parentTable.isResultSet()) {
			KeyKeyValuePersistor kkv = new KeyKeyValuePersistorImpl(getTransaction());
			for(String parentKey : parentKeys) {
				childClusters.put(parentKey, new HashSet<String>());
				childClusters.get(parentKey).addAll(kkv.getParentToChild(parentClusterColumn, parentKey, childClusterColumn));
			}
		}
		for (String parentKey : parentKeys) {
			Set<String> childResultRowIds = null;
			Set<String> parentResultRowIds = null;
			if (childClusters != null) {
				childResultRowIds = childClusters.get(parentKey);
			}
			if (parentClusters != null) {
				parentResultRowIds = parentClusters.get(parentKey);
			}
			for (String childResultKey : childResultRowIds) {
				for (String parentResultKey : parentResultRowIds) {
					Map<String, Object> childColumns = kv.getAllColumns(
							childResultKey, childTable);
					Map<String, Object> parentColumns = kv.getAllColumns(
							parentResultKey, parentTable);
					if (childColumns != null && parentColumns != null) {
						String resultKey = new Long(resultIndex).toString();
						for (String childColumnName : childColumns.keySet()) {
							String fullChildColumnName = childColumnName;
							if(!childTable.isResultSet()) {
								fullChildColumnName = new String(childTable.getAlias() + "." + childColumnName);
							}
							Column childColumn = Column.parseColumn(
									fullChildColumnName,
									childClusterColumn.getTable(),
									this.getTableAliases());
							childColumn.setTable(this.getTableAliases().get(
									childColumn.getTable().getAlias()));
							if (childColumn.equals(clusterColumn)) {
							}
							kv.setValue(resultKey, new Column(resultTable,
									fullChildColumnName), (String) childColumns
									.get(childColumnName));
						}
						for (String parentColumnName : parentColumns.keySet()) {
							String fullParentColumnName = parentColumnName;
							if(!parentTable.isResultSet()) {
								fullParentColumnName = new String(parentTable.getAlias() + "." + parentColumnName);
							}
							Column parentColumn = Column.parseColumn(
									fullParentColumnName,
									parentClusterColumn.getTable(),
									this.getTableAliases());
							parentColumn.setTable(this.getTableAliases().get(
									parentColumn.getTable().getAlias()));
							if (parentColumn.equals(clusterColumn)) {
							}
							kv.setValue(resultKey, new Column(resultTable,
									fullParentColumnName), (String) parentColumns
									.get(parentColumnName));
						}
						resultIndex++;
					}
				}
			}
		}
		if (leftTable.isResultSet()) {
			kv.dropTable(leftTable);
		}
		if (rightTable.isResultSet()) {
			kv.dropTable(rightTable);
		}
	}

	/**
	 * @param clusterColumn
	 *            the clusterColumn to set
	 */
	public void setClusterColumn(Column clusterColumn) {
		this.clusterColumn = clusterColumn;
	}

	/**
	 * @return the clusterColumn
	 */
	public Column getClusterColumn() {
		return clusterColumn;
	}

	/**
	 * @param startTable the startTable to set
	 */
	public void setStartTable(Table startTable) {
		this.startTable = startTable;
	}

	/**
	 * @return the startTable
	 */
	public Table getStartTable() {
		return startTable;
	}
}
